MQTTに到着するズンドコをAWS Iot Eventsでキヨシ判定してみた
1 はじめに
IoT事業部の平内(SIN)です。
ここ、DevelopersIOに、Kinesis Data Analyticsを使い方を超興味深く、紹介している記事があります。
今回、Kinesis Data Analyticsについて、ちょっと調べ物をしている際に、上記の記事にヒットして、改めて読ませて頂いていると・・・もしかすると、ズンドコは、AWS IoT Eventsでも、相性よく捌けるのでは?と思ってしまい、本題を忘れて、このブログを書いてます。
最初に結論ですが、やはり、ストリームを高速に捌くという要件では、Kinesis Data Analyticsが、俄然おすすめです。
本記事は、もし、「ズンドコ要件をIoT Eventsで実装するなら」ぐらいの軽いノリです。どうかお許しください。
2 構成
構成は、以下のようにしました。
入力側は、MQTTを送信するデバイスをイメージしたLambdaで、ランダムに、「ズン」若しくは、「ドコ」を送ります。
到着したメッセージは、そのままAWS IoT Eventsに送られます。
IoT Eventsでは、「ズン」「ズン」「ズン」「ズン」「ドコ」のパターンを検出すると、「キヨシ」というLambdaを実行します。
最終的に、入力側のLambdaと出力側のキヨシLambdaのログを付き合わせて、動作確認する流れです。
詳しい「ズンドコ要件」については、先の記事をご参照ください。
3 入力側Lambda
入力側のLambdaのコードです。内容は、ほぼ、先の記事のままで、Kinesis Data Analyticsに送るところを、MQTTのPublishに置き換えているだけです。
import json import boto3 import random import datetime import time iot = boto3.client('iot-data', region_name='ap-northeast-1') topic = 'zundoko' def lambda_handler(event, context): zundoko_array = [""] * 5 for i in range(300): zundoko = random.choice(['ズン', 'ドコ']) del zundoko_array[0] zundoko_array.append(zundoko) is_zun_zun_zun_zun_doko = zundoko_array == ['ズン', 'ズン', 'ズン', 'ズン', 'ドコ'] if is_zun_zun_zun_zun_doko: kiyoshi_time = datetime.datetime.utcnow() print(kiyoshi_time) payload = { "zundoko": zundoko } try: iot.publish( topic=topic, qos=0, payload=json.dumps(payload, ensure_ascii=False) ) except Exception as e: print(e) break time.sleep(0.1)
実行すると、AWS IoTのコンソールで、到着しているメッセージを確認できます。
4 出力側Lambda
IoT Eventsからinvokeされる、キヨシLambdaです。呼ばれた時間をLogに記録しているだけです。
import json import datetime def lambda_handler(event, context): print(json.dumps(event)) print(datetime.datetime.utcnow()) return
5 IoT Events
(1) 入力
入力は、到着していたメッセージをそのまま使って、Zundokoという名前で作成しました。
(2) 探知機モデル
探知機モデルの状態は、2つです。
- Idle
- StandBy
2つの状態は、下記の条件で移行します。
toStandBy "ズン"を受信した際に、StandBy状態に移行する
$input.Zundoko.zundoko == "ズン"
toIdle "ドコ"を受信した際に、Idle状態に移行する
$input.Zundoko.zundoko == "ドコ"
StandBy状態では、以下の3つのイベントが定義されています。
- OnEnter
- OnInput
- OnExit
OnEnterでは、変数counterを0に初期化します。
OnInputでは、「ズン」が到着した時に、counterをインクリメントしています。
OnExitでは、counterが3以上の場合に、キヨシLambdaをInvokeしています。 「StandBy」状態に居ると言うことは、「ズン」の受信が続いていることになりますが、「ドコ」が到着して「Idle」に遷移する際に、counterを確認してみて、4回「ズン」が続いている場合は、「パターンが成立している」ので、「キヨシ」の実行となる流れです。
6 結果
結果です。入力側のLambdaのログと、出力側のLambdaの時間を確認してみると、概ね1秒程度で「キヨシ」が、実行されているようです。
入力側 | 出力側 |
---|---|
2021-08-01 11:03:15.783485 | 2021-08-01 11:03:16.512271 |
2021-08-01 11:03:21.147276 | 2021-08-01 11:03:22.045987 |
2021-08-01 11:03:24.510608 | 2021-08-01 11:03:25.232286 |
2021-08-01 11:03:27.616386 | 2021-08-01 11:03:28.728645 |
2021-08-01 11:03:30.414850 | 2021-08-01 11:03:31.233361 |
2021-08-01 11:03:32.989325 | 2021-08-01 11:03:33.754709 |
2021-08-01 11:03:35.209572 | 2021-08-01 11:03:36.036711 |
7 最後に
今回、IoT Eventsで、「ズンドコ」パターンの検出を行ってみました。「過去の状態に基づいて、条件判断する」という要件は、変数や状態を保持することができる、IoT Eventsで処理することが可能です。
ただ、最初に記載しました通り、ストリームを高速に捌くという意味では、Kinesisに軍配が上がると思います。